/*
Copyright (c) 2023 China Mobile Information Technology Co., Ltd
OpenGauss Operator is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
         http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
*/

package service

import (
	"fmt"
	v12 "k8s.io/api/batch/v1"
	v1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/api/errors"
	"k8s.io/apimachinery/pkg/api/resource"
	metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/labels"
	"k8s.io/apimachinery/pkg/util/intstr"
	"k8s.io/client-go/tools/record"
	"k8s.io/klog/v2"
	gsv1 "openGauss-operator/api/v1"
	customClient "openGauss-operator/internal/client"
	"openGauss-operator/internal/util"
	"reflect"
	"sort"
	"strconv"
	"strings"
	"sync"
)

var _ ClusterManagementInterface = &ClusterManagement{}

type ClusterManagementInterface interface {
	ReconcileOpenGaussClusterAndStatus(gs *gsv1.OpenGaussCluster, gsStatus *gsv1.OpenGaussClusterStatus) error
	MaintainClusterStatus(cluster *gsv1.OpenGaussCluster) error
	DeleteDataAfterReleaseCluster(gs *gsv1.OpenGaussCluster) error
}

type ClusterManagement struct {
	K8sClient          customClient.K8sClient
	Record             record.EventRecorder
	switchover         SwitchoverInterface
	resourceAdjust     ResourceAdjustInterface
	backupRecovery     BackupRecoveryInterface
	standbyReconstruct StandbyReconstructInterface
	instanceMigration  InstanceMigrationInterface
	logCollection      LogCollectionInterface
}

func NewClusterManagement(client customClient.K8sClient, record record.EventRecorder) ClusterManagementInterface {
	switchover := NewSwitchover(client)
	resourceAdjust := NewResourceAdjust(client, switchover)
	return &ClusterManagement{
		K8sClient:          client,
		Record:             record,
		switchover:         switchover,
		resourceAdjust:     resourceAdjust,
		backupRecovery:     NewBackupRecovery(client),
		standbyReconstruct: NewStandbyReconstruct(client),
		instanceMigration:  NewInstanceMigration(client),
		logCollection:      NewLogCollection(client),
	}
}

func (r *ClusterManagement) MaintainClusterStatus(gs *gsv1.OpenGaussCluster) error {

	gsStatus := &gsv1.OpenGaussClusterStatus{}
	//集群基础资源检查，不存在则创建
	if err := r.ReconcileOpenGaussClusterAndStatus(gs, gsStatus); err != nil {
		return err
	}

	if gs.Status.State == util.ClusterPhaseNone {
		gsStatus.State = util.ClusterPhaseCreating
		if err := r.updateOpenGaussClusterStatus(gs, gsStatus, nil); err != nil {
			return err
		}
		return nil
	}
	if gs.Status.State != util.ClusterPhaseNone {
		// 初始化权限
		if err := r.initMsg(gs); err != nil {
			return err
		}

		// 备份、恢复、定时备份
		if err := r.backupRecovery.ReconcileBackupRecovery(gs); err != nil {
			klog.Error(err)
			//return err
		}

		// 备机重建
		if err := r.standbyReconstruct.ReconcileStandbyReconstruct(gs); err != nil {
			klog.Error(err)
			//return err
		}

		// 节点驱逐
		if err := r.instanceMigration.ReconcileInstanceMigration(gs); err != nil {
			klog.Error(err)
			//return err
		}

		// 主从切换
		if err := r.switchover.Switchover(gs, ""); err != nil {
			klog.Error(err)
			//return err
		}
		// 资源规格调整
		if err := r.resourceAdjust.ClusterResourceChange(gs); err != nil {
			klog.Error(err)
		}

		// 日志收集
		if err := r.logCollection.ReconcileLogCollection(); err != nil {
			klog.Error(err)
		}

		//获取集群状态和节点状态，如果ready个数相等 && 集群状态正常，设置guass集群状态 running
		//checkOpenGaussClusterStatus
		clusterMsgMap, err := r.K8sClient.ClusterMsgConvertToMap(gs)
		if err != nil {
			klog.Error(err)
		}

		gsClusterState := clusterMsgMap["cluster_state"]
		ok := gsStatus.CurrentReplicas == *gs.Spec.DataNodeNum
		if ok && gsClusterState == util.OpenGaussRunningState {
			gsStatus.State = util.ClusterPhaseRunning
		} else {
			gsStatus.State = gs.Status.State
			if gs.Status.State == util.ClusterPhaseRunning {
				gsStatus.State = util.ClusterPhaseReconciling
				//10s 增加1次 reconcile
				if err := r.updateOpenGaussClusterStatus(gs, gsStatus, clusterMsgMap); err != nil {
					klog.Error(err)
				}
				// 打印状态异常事件
				r.Record.Event(gs, v1.EventTypeWarning, "ClusterStatusAbnormal", "openGauss Cluster Status abnormal")
				return util.ReturnError("集群处于更新中，请等待... ...", nil)
			}
			if err := r.UpdateOpenGaussClusterStatus(gs, gsStatus); err != nil {
				return err
			}
		}

		if err := r.updateOpenGaussClusterStatus(gs, gsStatus, clusterMsgMap); err != nil {
			return err
		}
		return nil
	}
	return nil
}

func (r *ClusterManagement) DeleteDataAfterReleaseCluster(gs *gsv1.OpenGaussCluster) error {
	// add Finalizers
	if gs.DeletionTimestamp.IsZero() {
		if !util.ContainsStr(gs.Finalizers, util.FinalizerName) {
			gs.Finalizers = append(gs.Finalizers, util.FinalizerName)
			if err := r.K8sClient.Update(gs); err != nil {
				return err
			}
		}
	} else {
		// delete data
		if util.ContainsStr(gs.Finalizers, util.FinalizerName) {
			if err := r.DeleteGSData(gs); err != nil {
				return err
			}
		}
		gs.Finalizers = util.RemoveStr(gs.Finalizers, util.FinalizerName)
		if err := r.K8sClient.Update(gs); err != nil {
			return err
		}
		// delete pv
		if err := r.DeleteGaussClusterPV(gs); err != nil {
			return err
		}
	}
	return nil
}

func (r *ClusterManagement) initMsg(gs *gsv1.OpenGaussCluster) error {
	state := gs.Status.State
	if !gs.Spec.IsInit && state == "Running" {

		// 获取主节点
		primaryPodName, err := r.K8sClient.GetPrimaryPodName(gs)
		if primaryPodName == "" || err != nil {
			klog.Errorf("Get Primary PodName err ", err)
			return nil
		}

		// 在主节点执行初始化命令
		_, err = r.K8sClient.ExecCommand(gs.Namespace, util.Command(util.GetInitCommand), primaryPodName)
		if err != nil {
			klog.Errorf("init err %s", err)
		}

		// 执行成功，init修改为true
		if find := strings.Contains(err.Error(), "CREATE ROLE") || strings.Contains(err.Error(), "already exists"); find {
			modifyGs := gs.DeepCopy()
			modifyGs.Spec.IsInit = true
			if err := r.K8sClient.UpdateOpenGaussClusterObject(modifyGs, gs); err != nil {
				return err
			}
		}
	}

	return nil
}

func (r *ClusterManagement) ReconcileOpenGaussClusterAndStatus(gs *gsv1.OpenGaussCluster, gsStatus *gsv1.OpenGaussClusterStatus) error {

	// Reconcile Pvc/Pv
	if err := r.reconcilePvAndPvc(gs); err != nil {
		return err
	}

	// Reconcile Service
	if err := r.reconcileAllTypeService(gs); err != nil {
		return err
	}

	// Reconcile Pod
	if err := r.reconcileAllTypePodAndStatus(gs, gsStatus); err != nil {
		return err
	}

	if err := r.reconcileHeadlessService(gs); err != nil {
		return err
	}

	return nil
}

func (r *ClusterManagement) reconcileAllTypePodAndStatus(gs *gsv1.OpenGaussCluster, gsStatus *gsv1.OpenGaussClusterStatus) error {

	architecture := gs.Spec.Architecture

	// Reconcile db pod
	if err := r.reconcileDbPod(gs, gsStatus); err != nil {
		return err
	}

	// Reconcile cms pod
	if util.ArchitectureSplit == architecture {
		if err := r.reconcileCmsPod(gs, gsStatus); err != nil {
			return err
		}
	}

	// Reconcile exporter pod
	if err := r.reconcileExporterPod(gs); err != nil {
		return err
	}

	return nil
}

func (r *ClusterManagement) reconcileAllTypePod(gs *gsv1.OpenGaussCluster) error {

	architecture := gs.Spec.Architecture
	gsStatus := &gsv1.OpenGaussClusterStatus{}

	// Reconcile db pod
	if err := r.reconcileDbPod(gs, gsStatus); err != nil {
		return err
	}

	// Reconcile cms pod
	if util.ArchitectureSplit == architecture {
		if err := r.reconcileCmsPod(gs, gsStatus); err != nil {
			return err
		}
	}

	// Reconcile exporter pod
	if err := r.reconcileExporterPod(gs); err != nil {
		return err
	}

	//// update status
	//if err := r.updateOpenGaussClusterStatus(gs, gsStatus); err != nil {
	//	return err
	//}

	return nil
}

func (r *ClusterManagement) reconcileExporterPod(gs *gsv1.OpenGaussCluster) error {

	replicaPod, err := r.K8sClient.GetPod(gs.Namespace, GetExporterPodName(gs.Name))

	if err != nil {
		if errors.IsNotFound(err) {
			replicaPod = NewVersionedExporterPod(gs)
			if err := r.K8sClient.Create(replicaPod); err != nil {
				klog.Errorf("Create exporter pod err %s", err)
				return err
			}
		} else {
			klog.Errorf("Get exporter pod err %s", err)
			return err
		}
	}

	return nil
}

func (r *ClusterManagement) reconcileCmsPod(gs *gsv1.OpenGaussCluster,
	gsStatus *gsv1.OpenGaussClusterStatus) error {

	cmsConditions := make([]gsv1.OpenGaussPodCondition, util.CmsPodNum)
	gsStatus.CmsConditions = cmsConditions

	pods, err := r.K8sClient.GetPodList(gs.Namespace, SetPodsLabelsByType(gs, util.PodTypeCMS))
	if err != nil {
		klog.Errorf("Get cms pod list err %s", err)
		return err
	}

	replicaCount := util.CmsPodNum
	replicas := make([]*v1.Pod, replicaCount)

	// update pod

	for i := range pods {

		ord, err := util.GetPodOrdinal(pods[i].Name)
		if err != nil {
			return err
		}

		cmsConditions[ord] = *MakePodCondition(pods[i])

		if IsRunningAndReady(pods[i]) {
			gsStatus.CurrentCmsReplicas++
		}

		if 0 <= ord && ord < replicaCount {
			// if the ordinal of the pod is within the range of the current number of replicas,
			// insert it at the indirection of its ordinal
			replicas[ord] = pods[i]

		} else if ord >= replicaCount {
			// 超出数量的pod删除
			// if the ordinal is greater than the number of replicas add it to the condemned list
			//condemned = append(condemned, pods[i])
		}

	}

	// create pod
	for ord := 0; ord < replicaCount; ord++ {
		if replicas[ord] == nil {
			replicas[ord] = NewVersionedCmsPod(gs, ord)
		}
	}

	for i := range replicas {
		// isFailed

		// If we find a Pod that has not been created we create the Pod
		if !IsCreated(replicas[i]) {

			if err := r.K8sClient.Create(replicas[i]); err != nil {
				return err
			}

			continue
		}

		// isTerminating

		// !isRunningAndReady

	}

	return nil
}

func (r *ClusterManagement) reconcileDbPod(gs *gsv1.OpenGaussCluster,
	gsStatus *gsv1.OpenGaussClusterStatus) error {

	conditions := make([]gsv1.OpenGaussPodCondition, *gs.Spec.DataNodeNum)
	gsStatus.Conditions = conditions

	pods, err := r.K8sClient.GetPodList(gs.Namespace, SetPodsLabelsByType(gs, util.PodTypeDB))

	if err != nil {
		klog.Errorf("Get db pod list err %s", err)
		return err
	}

	replicaCount := int(*gs.Spec.DataNodeNum)
	replicas := make([]*v1.Pod, replicaCount)

	condemned := make([]*v1.Pod, 0, len(pods))

	for i := range pods {

		ord, err := util.GetPodOrdinal(pods[i].Name)
		if err != nil {
			return err
		}

		if IsRunningAndReady(pods[i]) {
			gsStatus.CurrentReplicas++
			if err := r.FixedNode(gs, pods[i]); err != nil {
				return err
			}
		}

		// isCreated(pods[i]) && !isTerminating(pods[i]) status.CurrentReplicas++ status.CurrentReplicas++

		if 0 <= ord && ord < replicaCount {
			// if the ordinal of the pod is within the range of the current number of replicas,
			// insert it at the indirection of its ordinal
			conditions[ord] = *MakePodCondition(pods[i])
			replicas[ord] = pods[i]

		} else if ord >= replicaCount {
			// 超出数量的pod删除
			condemned = append(condemned, pods[i])
		}
	}

	// create pod
	for ord := 0; ord < replicaCount; ord++ {
		if replicas[ord] == nil {
			replicas[ord] = NewVersionedDbPod(gs, ord)
		}
	}

	for i := range replicas {
		// isFailed

		// If we find a Pod that has not been created we create the Pod
		if !IsCreated(replicas[i]) {
			if err := r.K8sClient.Create(replicas[i]); err != nil {
				return err
			}
			continue
		}

		// isTerminating

		// !isRunningAndReady

	}

	// sort the condemned Pods by their ordinals
	sort.Sort(ascendingOrdinal(condemned))
	for target := len(condemned) - 1; target >= 0; target-- {
		if condemned[target] == nil {
			continue
		}
		if err := r.K8sClient.DeletePod(condemned[target].Name, condemned[target].Namespace); err != nil {
			return err
		}
	}

	return nil
}

type ascendingOrdinal []*v1.Pod

func (ao ascendingOrdinal) Len() int {
	return len(ao)
}

func (ao ascendingOrdinal) Swap(i, j int) {
	ao[i], ao[j] = ao[j], ao[i]
}

func (ao ascendingOrdinal) Less(i, j int) bool {

	if ao[i] == nil || ao[j] == nil {
		return false
	}
	ordinalI, _ := util.GetPodOrdinal(ao[i].Name)
	ordinalJ, _ := util.GetPodOrdinal(ao[j].Name)
	return ordinalI < ordinalJ
}

func (r *ClusterManagement) reconcileAllTypeService(gs *gsv1.OpenGaussCluster) error {

	architecture := gs.Spec.Architecture

	// Reconcile db Service
	if err := r.reconcileDbService(gs); err != nil {
		return err
	}

	// 如果不是拆分跳过 get cm pod 没有填充  cms
	if util.ArchitectureSplit == architecture {
		if err := r.reconcileCmsService(gs); err != nil {
			return err
		}
	}

	return nil
}

func (r *ClusterManagement) reconcileCmsService(gs *gsv1.OpenGaussCluster) error {

	services, err := r.K8sClient.GetServiceList(gs.Namespace, SetServicesLabelsByType(gs, util.PodTypeCMS))
	if err != nil {
		klog.Errorf("Get cms service list err %s", err)
		return err
	}

	replicaCount := util.CmsPodNum
	replicas := make([]*v1.Service, replicaCount)

	// Proofread data
	for i := range services {
		if ord, err := GetSvcOrdinal(services[i]); 0 <= ord && ord < replicaCount {
			if err != nil {
				return err
			}
			replicas[ord] = services[i]
		}
	}

	// Create Service
	for ord := 0; ord < replicaCount; ord++ {
		if replicas[ord] == nil {
			replicas[ord] = NewCmsService(gs, ord)
			if err := r.K8sClient.Create(replicas[ord]); err != nil {
				return err
			}
		}
	}

	return nil
}

func (r *ClusterManagement) reconcileDbService(gs *gsv1.OpenGaussCluster) error {

	services, err := r.K8sClient.GetServiceList(gs.Namespace, SetServicesLabelsByType(gs, util.PodTypeDB))
	if err != nil {
		klog.Errorf("Get db service list err %s", err)
		return err
	}

	replicaCount := int(*gs.Spec.DataNodeNum)
	replicas := make([]*v1.Service, replicaCount)

	// Proofread data
	for i := range services {
		if ord, err := GetSvcOrdinal(services[i]); 0 <= ord && ord < replicaCount {
			if err != nil {
				return err
			}
			replicas[ord] = services[i]
		}
	}

	// Create service
	for ord := 0; ord < replicaCount; ord++ {
		if replicas[ord] == nil {
			replicas[ord] = NewDbService(gs, ord)
			if err := r.K8sClient.Create(replicas[ord]); err != nil {
				return err
			}
		}
	}

	return nil
}

func (r *ClusterManagement) reconcileHeadlessService(gs *gsv1.OpenGaussCluster) error {
	dnPod, err := r.K8sClient.GetPodList(gs.Namespace, SetPodsLabelsByType(gs, util.PodTypeDB))
	if err != nil {
		return err
	}
	cmsPod, err := r.K8sClient.GetPodList(gs.Namespace, SetPodsLabelsByType(gs, util.PodTypeCMS))
	if err != nil {
		return err
	}
	pods := append(dnPod, cmsPod...)

	for _, pod := range pods {
		_, err := r.K8sClient.GetService(gs.Namespace, pod.Name)
		if err != nil && errors.IsNotFound(err) {
			if err := r.K8sClient.Create(NewHeadlessService(gs, pod)); err != nil {
				return err
			}
		} else if err != nil {
			return err
		} else {
			if err := r.K8sClient.Update(NewHeadlessService(gs, pod)); err != nil {
				return err
			}
		}
	}
	return nil
}

func (r *ClusterManagement) reconcilePvAndPvc(gs *gsv1.OpenGaussCluster) error {

	// Reconcile PersistentVolume
	if err := r.reconcilePv(gs); err != nil {
		return err
	}

	// Reconcile PersistentVolumeClaim
	if err := r.reconcilePvc(gs); err != nil {
		return err
	}

	return nil
}

func (r *ClusterManagement) reconcilePvc(gs *gsv1.OpenGaussCluster) error {
	pvcList, err := r.K8sClient.GetPvcList(gs.Namespace, SetPvAndPvcLabels(gs))
	if err != nil {
		klog.Errorf("Get pv list err %s", err)
		return err
	}

	replicaCount := int(*gs.Spec.DataNodeNum)
	dbReplicas := make([]*v1.PersistentVolumeClaim, replicaCount)
	cmsReplicas := make([]*v1.PersistentVolumeClaim, util.CmsPodNum)

	// Proofread data
	for i := range pvcList {
		if util.IsCmsResource(pvcList[i].Name) {
			if ord, err := GetPvcOrdinal(pvcList[i].Name); 0 <= ord && ord < util.CmsPodNum {
				if err != nil {
					return err
				}
				cmsReplicas[ord] = pvcList[i]
			}
		} else {
			if ord, err := GetPvcOrdinal(pvcList[i].Name); 0 <= ord && ord < replicaCount {
				if err != nil {
					return err
				}
				dbReplicas[ord] = pvcList[i]
			}
		}

	}

	// Create PersistentVolumeClaim
	for ord := 0; ord < replicaCount; ord++ {
		if dbReplicas[ord] == nil {
			dbReplicas[ord] = PersistentVolumeClaim(gs, ord, util.PodTypeDB)
			if err := r.K8sClient.Create(dbReplicas[ord]); err != nil {
				return err
			}
		}
	}

	// Create PersistentVolumeClaim
	for ord := 0; ord < util.CmsPodNum; ord++ {
		if cmsReplicas[ord] == nil {
			cmsReplicas[ord] = PersistentVolumeClaim(gs, ord, util.PodTypeCMS)
			if err := r.K8sClient.Create(cmsReplicas[ord]); err != nil {
				return err
			}
		}
	}

	return nil
}

func (r *ClusterManagement) reconcilePv(gs *gsv1.OpenGaussCluster) error {

	pvList, err := r.K8sClient.GetPvList(SetPvAndPvcLabels(gs))
	if err != nil {
		klog.Errorf("Get pv list err %s", err)
		return err
	}

	replicaCount := int(*gs.Spec.DataNodeNum)
	dbReplicas := make([]*v1.PersistentVolume, replicaCount)
	cmsReplicas := make([]*v1.PersistentVolume, util.CmsPodNum)

	// Proofread data
	for i := range pvList {
		if util.IsCmsResource(pvList[i].Name) {
			if ord, err := GetPvOrdinal(pvList[i].Name); 0 <= ord && ord < util.CmsPodNum {
				if err != nil {
					return err
				}
				cmsReplicas[ord] = pvList[i]
			}
		} else {
			if ord, err := GetPvOrdinal(pvList[i].Name); 0 <= ord && ord < replicaCount {
				if err != nil {
					return err
				}
				dbReplicas[ord] = pvList[i]
			}
		}

	}

	// Create PersistentVolume
	for ord := 0; ord < replicaCount; ord++ {
		if dbReplicas[ord] == nil {
			dbReplicas[ord] = NewPersistentVolume(gs, ord, util.PodTypeDB)
			if err := r.K8sClient.Create(dbReplicas[ord]); err != nil {
				return err
			}
		}
	}

	for ord := 0; ord < util.CmsPodNum; ord++ {
		if cmsReplicas[ord] == nil {
			cmsReplicas[ord] = NewPersistentVolume(gs, ord, util.PodTypeCMS)
			if err := r.K8sClient.Create(cmsReplicas[ord]); err != nil {
				return err
			}
		}
	}

	return nil
}

func (r *ClusterManagement) FixedNode(gs *gsv1.OpenGaussCluster, pod *v1.Pod) error {

	ord, err := util.GetPodOrdinal(pod.Name)
	if err != nil {
		return err
	}

	pvName := getPvName(gs.Name, ord)
	pv, err := r.K8sClient.GetPv(pvName, gs.Namespace)
	if err != nil {
		return err
	}
	if pv.Spec.NodeAffinity != nil {
		return nil
	}

	nodeName := pod.Spec.NodeName
	modifiedPv := pv.DeepCopy()
	modifiedPv.Spec.NodeAffinity = setVolumeNodeAffinity(nodeName)

	if err := r.K8sClient.PatchPv(modifiedPv, pv); err != nil {
		return err
	}

	return nil
}

var wg = sync.WaitGroup{}

func (r *ClusterManagement) DeleteGSData(gs *gsv1.OpenGaussCluster) error {
	ttl := int32(60)
	dbPods, err := r.K8sClient.GetPodList(gs.Namespace, SetPodsLabelsByType(gs, util.PodTypeDB))
	if err != nil {
		klog.Errorf("Get db pod list err %s", err)
		return err
	}

	cmsPods, err := r.K8sClient.GetPodList(gs.Namespace, SetPodsLabelsByType(gs, util.PodTypeCMS))
	if err != nil {
		klog.Errorf("Get db pod list err %s", err)
		return err
	}

	if len(dbPods) <= 0 || len(cmsPods) <= 0 {
		return nil
	}

	createJob := func(pod v1.Pod) {
		nodeName := pod.Spec.NodeName
		if nodeName == "" {
			klog.Info(fmt.Sprintf("deleteGSDate get pod nodeName is nil"))
			return
		}
		jobName := fmt.Sprintf("%v-%v", pod.Name, "del")
		var hostPath string
		var deleteDataCmd string
		if strings.Contains(pod.Name, "cms") {
			hostPath = fmt.Sprintf("/data/cmdb/%v", gs.Name+util.CmsSuffixString)
			deleteDataCmd = "sleep 30; rm -rf /data/**"
		} else {
			hostPath = fmt.Sprintf("/data/cmdb/%v", gs.Name+"-db")
			deleteDataCmd = "rm -rf /data/**"
		}
		job := &v12.Job{
			ObjectMeta: metaV1.ObjectMeta{
				Name:      jobName,
				Namespace: pod.Namespace,
				Labels:    util.GetJobLabels(gs, jobName),
			},
			Spec: v12.JobSpec{
				TTLSecondsAfterFinished: &ttl,
				Template: v1.PodTemplateSpec{
					ObjectMeta: metaV1.ObjectMeta{
						Labels:    util.GetJobLabels(gs, jobName),
						Name:      jobName,
						Namespace: pod.Namespace,
					},
					Spec: v1.PodSpec{
						Affinity: &v1.Affinity{
							NodeAffinity: &v1.NodeAffinity{
								RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
									NodeSelectorTerms: []v1.NodeSelectorTerm{
										{
											MatchExpressions: []v1.NodeSelectorRequirement{
												{
													Key:      util.HostnameKey,
													Operator: v1.NodeSelectorOpIn,
													Values: []string{
														nodeName,
													},
												},
											},
										},
									},
								},
							},
						},
						Containers: []v1.Container{
							{
								Command: []string{"sh", "-c", deleteDataCmd},
								Image:   util.BusyBox,
								Name:    jobName,
								Resources: v1.ResourceRequirements{
									Limits: map[v1.ResourceName]resource.Quantity{
										v1.ResourceCPU:    resource.MustParse("100m"),
										v1.ResourceMemory: resource.MustParse("128Mi"),
									},
									Requests: map[v1.ResourceName]resource.Quantity{
										v1.ResourceCPU:    resource.MustParse("100m"),
										v1.ResourceMemory: resource.MustParse("128Mi"),
									},
								},
								VolumeMounts: []v1.VolumeMount{
									{
										Name:      util.VolumeGaussData,
										MountPath: "data",
									},
								},
							},
						},
						HostNetwork:   true,
						RestartPolicy: v1.RestartPolicyOnFailure,
						Volumes: []v1.Volume{
							{
								Name: util.VolumeGaussData,
								VolumeSource: v1.VolumeSource{
									HostPath: &v1.HostPathVolumeSource{
										Path: hostPath,
									},
								},
							},
						},
					},
				},
			},
		}
		if err := r.K8sClient.Create(job); err != nil {
			klog.Errorf("delete gs data error %s", err)
		}
	}
	wg.Add(len(dbPods) + len(cmsPods))
	for _, pod := range dbPods {
		go createJob(*pod)
		wg.Done()
	}

	for _, pod := range cmsPods {
		go createJob(*pod)
		wg.Done()
	}

	wg.Wait()
	return nil
}

func (r *ClusterManagement) DeleteGaussClusterPV(gs *gsv1.OpenGaussCluster) error {

	pvList, err := r.K8sClient.GetPvList(SetPvAndPvcLabels(gs))
	if err != nil {
		klog.Errorf("Get pv list err %s", err)
		return err
	}

	for _, podPV := range pvList {
		if err := r.K8sClient.DeletePv(podPV.Name, podPV.Namespace); err != nil {
			klog.Errorf("delete pv list err %s", err)
			return err
		}
	}
	return nil
}

func NewVersionedDbPod(gs *gsv1.OpenGaussCluster, ordinal int) *v1.Pod {

	// 更新版本
	//  (待补充)

	// 填充pod
	dbPod := getDbPodFromTemplate(gs, ordinal)

	setDbVolume(gs, dbPod, ordinal)

	setDbReadnessAndLivenes(dbPod)

	return dbPod
}

func setDbReadnessAndLivenes(dbpod *v1.Pod) {
	for i, container := range dbpod.Spec.Containers {
		if container.Name == util.ContainerNameGaussDb {
			dbpod.Spec.Containers[i].ReadinessProbe = setReadiness()
			dbpod.Spec.Containers[i].LivenessProbe = setLiveness()
		}
	}

}

func setReadiness() *v1.Probe {
	return &v1.Probe{
		ProbeHandler: v1.ProbeHandler{
			Exec: &v1.ExecAction{
				Command: []string{
					"sh", "/readnesscheck.sh",
				},
			},
		},
		PeriodSeconds:    5,
		FailureThreshold: 3,
		TimeoutSeconds:   3,
		SuccessThreshold: 1,
	}
}

func setLiveness() *v1.Probe {
	return &v1.Probe{
		ProbeHandler: v1.ProbeHandler{
			Exec: &v1.ExecAction{
				Command: []string{
					"sh", "/livenesscheck.sh",
				},
			},
		},
		PeriodSeconds:       5,
		InitialDelaySeconds: 300,
		FailureThreshold:    3,
		TimeoutSeconds:      3,
		SuccessThreshold:    1,
	}

}

func getExporterPodFromTemplate(gs *gsv1.OpenGaussCluster) *v1.Pod {

	var exporterPod = &v1.Pod{
		ObjectMeta: metaV1.ObjectMeta{
			Name:            GetExporterPodName(gs.Name),
			Namespace:       gs.Namespace,
			Labels:          setExporterPodLabels(gs, util.PodTypeExporter),
			Annotations:     getExporterPodsAnnotationSet(gs),
			OwnerReferences: util.MakeOwnerRef(gs),
		},
		Spec: v1.PodSpec{
			Containers: []v1.Container{
				{
					Name:      util.ContainerNameGaussExporter,
					Image:     gs.Spec.Images.GaussExporter,
					Env:       setGaussExporterEnv(gs),
					Resources: getContainerResources(&gs.Spec.DBResources),
					Ports: []v1.ContainerPort{
						{
							ContainerPort: util.ContainerPortExporter,
						},
					},
				},
			},
		},
	}
	return exporterPod
}

func getCmsPodFromTemplate(gs *gsv1.OpenGaussCluster, ordinal int) *v1.Pod {
	return &v1.Pod{
		ObjectMeta: metaV1.ObjectMeta{
			Name:            getCmsPodName(gs.Name, ordinal),
			Namespace:       gs.Namespace,
			Labels:          setCmsPodLabels(gs, util.PodTypeCMS, ordinal),
			Annotations:     setDbAndCmsPodAnnotation(gs, util.PodTypeCMS, ordinal),
			OwnerReferences: util.MakeOwnerRef(gs),
		},
		Spec: v1.PodSpec{
			Containers: []v1.Container{
				{
					Name:         util.ContainerNameGaussCms,
					Image:        gs.Spec.Images.GaussDB,
					Command:      util.Command(util.GaussDbCommand),
					Env:          setGaussCmsEnv(gs),
					Resources:    getContainerResources(&gs.Spec.DBResources),
					VolumeMounts: setGaussDbVolumeMounts(),
				},
			},
			Affinity:  setPodAntiAffinity(util.PodTypeKey, util.PodTypeCMS, gs.Name),
			Hostname:  getCmsPodName(gs.Name, ordinal),
			Subdomain: getCmsSvcName(gs.Name, ordinal),
		},
	}
}

func NewVersionedExporterPod(gs *gsv1.OpenGaussCluster) *v1.Pod {

	// 更新版本
	//  (待补充)

	// 填充pod
	cmsPod := getExporterPodFromTemplate(gs)

	return cmsPod
}

func NewVersionedCmsPod(gs *gsv1.OpenGaussCluster, ordinal int) *v1.Pod {

	// 更新版本
	//  (待补充)

	// 填充pod
	cmsPod := getCmsPodFromTemplate(gs, ordinal)

	setCmsVolume(gs, cmsPod, ordinal)
	return cmsPod
}

func setDbVolume(gs *gsv1.OpenGaussCluster, dbPod *v1.Pod, ordinal int) {
	dbPod.Spec.Volumes = []v1.Volume{
		*SetVolumeGaussData(gs.Name, ordinal),
	}
}

func setCmsVolume(gs *gsv1.OpenGaussCluster, cmsPod *v1.Pod, ordinal int) {
	cmsPod.Spec.Volumes = []v1.Volume{
		*SetVolumeGaussData(gs.Name+util.CmsSuffixString, ordinal),
	}
}

func SetVolumeGaussData(gsName string, ordinal int) *v1.Volume {
	return &v1.Volume{
		Name: util.VolumeGaussData,
		VolumeSource: v1.VolumeSource{
			PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
				ClaimName: getPvcName(gsName, ordinal),
			},
		},
	}
}

func IsCreated(pod *v1.Pod) bool {
	return pod.Status.Phase != ""
}

func IsRunningAndReady(pod *v1.Pod) bool {
	return pod.Status.Phase == v1.PodRunning && isPodReady(pod)
}

func isPodReady(pod *v1.Pod) bool {
	status := pod.Status
	_, condition := getPodCondition(&status, v1.PodReady)
	return condition != nil && condition.Status == v1.ConditionTrue
}

func getPodCondition(status *v1.PodStatus, conditionType v1.PodConditionType) (int, *v1.PodCondition) {
	if status == nil {
		return -1, nil
	}
	conditions := status.Conditions
	if conditions == nil {
		return -1, nil
	}
	for i := range conditions {
		if conditions[i].Type == conditionType {
			return i, &conditions[i]
		}
	}
	return -1, nil
}

func getDbPodFromTemplate(gs *gsv1.OpenGaussCluster, ordinal int) *v1.Pod {

	dbPod := &v1.Pod{
		ObjectMeta: metaV1.ObjectMeta{
			Name:            getDbPodName(gs.Name, ordinal),
			Namespace:       gs.Namespace,
			Labels:          setDbPodLabels(gs, util.PodTypeDB, ordinal),
			Annotations:     setDbAndCmsPodAnnotation(gs, util.PodTypeDB, ordinal),
			OwnerReferences: util.MakeOwnerRef(gs),
		},
		Spec: v1.PodSpec{
			Containers: []v1.Container{
				{
					Name:         util.ContainerNameGaussDb,
					Image:        gs.Spec.Images.GaussDB,
					Command:      util.Command(util.GaussDbCommand),
					Env:          setGaussDbEnv(gs),
					Resources:    getContainerResources(&gs.Spec.DBResources),
					VolumeMounts: setGaussDbVolumeMounts(),
					Ports: []v1.ContainerPort{
						{
							ContainerPort: 5432,
							Protocol:      "TCP",
						},
					},
				},
			},
			Affinity:  setPodAntiAffinity(util.PodTypeKey, util.PodTypeDB, gs.Name),
			Hostname:  getDbPodName(gs.Name, ordinal),
			Subdomain: getDbSvcName(gs.Name, ordinal),
		},
	}
	return dbPod
}

func setGaussDbVolumeMounts() []v1.VolumeMount {
	return []v1.VolumeMount{
		mountVolumeGaussData(),
	}
}

func mountVolumeGaussData() v1.VolumeMount {
	return v1.VolumeMount{
		Name:      util.VolumeGaussData,
		MountPath: util.MountPatGaussData,
	}
}

func setGaussExporterEnv(gs *gsv1.OpenGaussCluster) []v1.EnvVar {
	return []v1.EnvVar{
		{
			Name:  util.ExporterDataSourceName,
			Value: getExporterDataSourceUrl(gs),
		},
	}
}

func getExporterDataSourceUrl(gs *gsv1.OpenGaussCluster) string {
	var url string
	dataNodeNum := int(*gs.Spec.DataNodeNum)
	for i := 0; i < dataNodeNum; i++ {
		url += getOneDataSourceUrl(gs, i)
		if i+1 != dataNodeNum {
			url += ","
		}
	}
	return url
}

func getOneDataSourceUrl(gs *gsv1.OpenGaussCluster, i int) string {
	return "postgresql://" + util.ExporterUsername + ":" + util.ExporterPassword +
		"@" + getDbPodName(gs.Name, i) + "." + getDbSvcName(gs.Name, i) + ":5432/postgres?sslmode=disable"
}

func setGaussCmsEnv(gs *gsv1.OpenGaussCluster) []v1.EnvVar {
	envs := setGaussDbEnv(gs)
	env := v1.EnvVar{
		Name:  "IS_CMS",
		Value: "true",
	}
	envs = append(envs, env)
	return envs
}

func setGaussDbEnv(gs *gsv1.OpenGaussCluster) []v1.EnvVar {
	return []v1.EnvVar{
		{
			Name:  "GS_PASSWORD",
			Value: util.GsPassword,
		},
		{
			Name:  "DN_NUM",
			Value: strconv.Itoa(int(*gs.Spec.DataNodeNum)),
		},
	}
}

func getContainerResources(resources *gsv1.ResourceList) v1.ResourceRequirements {
	return v1.ResourceRequirements{
		Limits: v1.ResourceList{
			"cpu":    resources.Limits["cpu"],
			"memory": resources.Limits["memory"],
		},
		Requests: v1.ResourceList{
			"cpu":    resources.Requests["cpu"],
			"memory": resources.Requests["memory"],
		},
	}
}

func setPodAntiAffinity(key string, value string, ogcName string) *v1.Affinity {
	return &v1.Affinity{
		PodAntiAffinity: &v1.PodAntiAffinity{
			RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
				{
					LabelSelector: &metaV1.LabelSelector{
						MatchExpressions: []metaV1.LabelSelectorRequirement{
							{
								Key: util.OgcNameKey,
								Values: []string{
									ogcName,
								},
								Operator: util.OperatorIn,
							},
							{
								Key: key,
								Values: []string{
									value,
								},
								Operator: util.OperatorIn,
							},
						},
					},
					TopologyKey: util.HostnameKey,
				},
			},
		},
	}
}

func getExporterPodsAnnotationSet(gs *gsv1.OpenGaussCluster) labels.Set {
	desiredAnnotations := getBasePodAnnotationSet(gs)
	desiredAnnotations["gyt.io/scrape_port"] = strconv.Itoa(util.ContainerPortExporter)
	desiredAnnotations["gyt.io/should_be_scraped"] = "true"
	return desiredAnnotations
}

func setDbAndCmsPodAnnotation(gs *gsv1.OpenGaussCluster, podType string, ordinal int) labels.Set {
	annotations := getBasePodAnnotationSet(gs)
	if fixedIps := findFixedPodIp(gs, podType, ordinal); fixedIps != nil {
		fixedIp := ""
		ipCount := len(fixedIps)
		if ipCount == 1 {
			fixedIp = "[\"" + fixedIps[0] + "\"]"
		} else if ipCount == 2 {
			fixedIp = "[\"" + fixedIps[0] + "\",\"" + fixedIps[1] + "\"]"
		}
		annotations[util.CalicoFixedIpKey] = fixedIp
	}
	return annotations
}

func getBasePodAnnotationSet(gs *gsv1.OpenGaussCluster) labels.Set {
	podAnnotations := gs.Annotations
	if podAnnotations == nil {
		podAnnotations = map[string]string{}
	}
	return podAnnotations
}

func getPvcName(gsName string, ordinal int) string {
	return fmt.Sprintf("%s-pvc", getDbPodName(gsName, ordinal))
}

func getPvName(gsName string, ordinal int) string {
	return fmt.Sprintf("%s-pv", getDbPodName(gsName, ordinal))
}

func getDbSvcName(gsName string, ordinal int) string {
	return fmt.Sprintf("%s-svc", getDbPodName(gsName, ordinal))
}

func getCmsSvcName(gsName string, ordinal int) string {
	return fmt.Sprintf("%s-svc", getCmsPodName(gsName, ordinal))
}

func getDbPodName(gsName string, ordinal int) string {
	return fmt.Sprintf("%s-%d", gsName, ordinal)
}

func getCmsPodName(gsName string, ordinal int) string {
	return fmt.Sprintf("%s-cms-%d", gsName, ordinal)
}

func GetExporterPodName(gsName string) string {
	return fmt.Sprintf("%s-exporter", gsName)
}

func GetPvcOrdinal(pvName string) (int, error) {
	ordinal, err := strconv.Atoi(pvName[(len(pvName) - 5):(len(pvName) - 4)])
	if err != nil {
		return ordinal, err
	}
	return ordinal, nil
}

func GetPvOrdinal(pvName string) (int, error) {
	ordinal, err := strconv.Atoi(pvName[(len(pvName) - 4):(len(pvName) - 3)])
	if err != nil {
		return ordinal, err
	}
	return ordinal, nil
}

func GetSvcOrdinal(service *v1.Service) (int, error) {
	ordinal, err := strconv.Atoi(service.Name[(len(service.Name) - 5):(len(service.Name) - 4)])
	if err != nil {
		return ordinal, err
	}
	return ordinal, nil
}

func getPodNameWithSvc(gsName string, podType string, ord int) string {
	if podType == util.PodTypeDB {
		return getDbPodName(gsName, ord)
	} else if podType == util.PodTypeCMS {
		return getCmsPodName(gsName, ord)
	}
	return ""
}

func SetPvAndPvcLabels(gs *gsv1.OpenGaussCluster) map[string]string {
	pvLabels := setBaseLabels(gs)
	pvLabels[util.StorageTypeKey] = gs.Spec.DBStorage.StorageType
	return pvLabels
}

func setPvAndPvcAnnotations(gs *gsv1.OpenGaussCluster) map[string]string {
	pvAnnotations := setBaseAnnotations(gs)
	return pvAnnotations
}

func setBaseLabels(gs *gsv1.OpenGaussCluster) map[string]string {
	gsCopy := gs.DeepCopy()
	baseLabels := gsCopy.Labels
	baseLabels[util.BaseNameKey] = util.BaseNameValue
	baseLabels[util.OgcNameKey] = gs.Name
	return baseLabels
}

func setBaseAnnotations(gs *gsv1.OpenGaussCluster) map[string]string {
	gsCopy := gs.DeepCopy()
	baseAnnotations := gsCopy.Annotations
	return baseAnnotations
}

func setHeadlessServiceLabels(gs *gsv1.OpenGaussCluster, name string) map[string]string {
	svcLabels := setBaseLabels(gs)
	svcLabels[util.PodNameKey] = name
	return svcLabels
}

func setServicesLabels(gs *gsv1.OpenGaussCluster, podType string, ord int) map[string]string {
	svcLabels := SetServicesLabelsByType(gs, podType)
	svcLabels[util.PodNameKey] = getPodNameWithSvc(gs.Name, podType, ord)
	return svcLabels
}

func SetServicesLabelsByType(gs *gsv1.OpenGaussCluster, podType string) map[string]string {
	svcLabels := setBaseLabels(gs)
	svcLabels[util.PodTypeKey] = podType
	return svcLabels
}

func setDbPodLabels(gs *gsv1.OpenGaussCluster, podType string, ordinal int) map[string]string {
	podLabels := SetPodsLabelsByType(gs, podType)
	podLabels[util.PodNameKey] = getDbPodName(gs.Name, ordinal)
	return podLabels
}

func setCmsPodLabels(gs *gsv1.OpenGaussCluster, podType string, ordinal int) map[string]string {
	podLabels := SetPodsLabelsByType(gs, podType)
	podLabels[util.PodNameKey] = getCmsPodName(gs.Name, ordinal)
	return podLabels
}

func setExporterPodLabels(gs *gsv1.OpenGaussCluster, podType string) map[string]string {
	podLabels := SetPodsLabelsByType(gs, podType)
	podLabels[util.PodNameKey] = GetExporterPodName(gs.Name)
	return podLabels
}

func SetPodsLabelsByType(gs *gsv1.OpenGaussCluster, podType string) map[string]string {
	podBaseLabels := setBaseLabels(gs)
	podBaseLabels[util.PodTypeKey] = podType
	return podBaseLabels
}

func NewCmsService(gs *gsv1.OpenGaussCluster, ord int) *v1.Service {
	policyPreferDualStack := v1.IPFamilyPolicyPreferDualStack
	return &v1.Service{
		ObjectMeta: metaV1.ObjectMeta{
			Name:            getCmsSvcName(gs.Name, ord),
			Namespace:       gs.Namespace,
			Labels:          setServicesLabels(gs, util.PodTypeCMS, ord),
			Annotations:     gs.Annotations,
			OwnerReferences: util.MakeOwnerRef(gs),
		},
		Spec: v1.ServiceSpec{
			IPFamilyPolicy: &policyPreferDualStack,
			Selector:       setServicesLabels(gs, util.PodTypeCMS, ord),
			Ports: []v1.ServicePort{
				{
					Port: 80,
				},
			},
		},
	}
}

func PersistentVolumeClaim(gs *gsv1.OpenGaussCluster, ord int, podType string) *v1.PersistentVolumeClaim {
	var pvcName string
	var pvName string
	if podType == util.PodTypeCMS {
		pvName = getPvName(gs.Name+util.CmsSuffixString, ord)
		pvcName = getPvcName(gs.Name+util.CmsSuffixString, ord)
	} else if podType == util.PodTypeDB {
		pvName = getPvName(gs.Name, ord)
		pvcName = getPvcName(gs.Name, ord)
	}

	return &v1.PersistentVolumeClaim{
		ObjectMeta: metaV1.ObjectMeta{
			Name:            pvcName,
			Labels:          SetPvAndPvcLabels(gs),
			Namespace:       gs.Namespace,
			Annotations:     setPvAndPvcAnnotations(gs),
			OwnerReferences: util.MakeOwnerRef(gs),
		},
		Spec: v1.PersistentVolumeClaimSpec{
			AccessModes: []v1.PersistentVolumeAccessMode{
				"ReadWriteOnce",
			},
			Resources: v1.ResourceRequirements{
				Requests: v1.ResourceList{
					"storage": gs.Spec.DBStorage.DiskCapacity,
				},
			},
			VolumeName:       pvName,
			StorageClassName: &gs.Spec.DBStorage.StorageClassName,
		},
	}
}

func NewPersistentVolume(gs *gsv1.OpenGaussCluster, ord int, podType string) *v1.PersistentVolume {
	var pvName string
	if podType == util.PodTypeDB {
		pvName = getPvName(gs.Name, ord)
	} else if podType == util.PodTypeCMS {
		pvName = getPvName(gs.Name+util.CmsSuffixString, ord)
	}
	return &v1.PersistentVolume{
		ObjectMeta: metaV1.ObjectMeta{
			Name:            pvName,
			Labels:          SetPvAndPvcLabels(gs),
			Annotations:     setPvAndPvcAnnotations(gs),
			OwnerReferences: util.MakeOwnerRef(gs),
		},
		Spec: v1.PersistentVolumeSpec{
			Capacity: v1.ResourceList{
				"storage": gs.Spec.DBStorage.DiskCapacity,
			},
			PersistentVolumeSource: setPersistentVolumeSource(gs, podType),
			AccessModes: []v1.PersistentVolumeAccessMode{
				"ReadWriteOnce",
			},
			StorageClassName: gs.Spec.DBStorage.StorageClassName,
		},
	}
}

func setPersistentVolumeSource(gs *gsv1.OpenGaussCluster, podType string) v1.PersistentVolumeSource {
	storageType := gs.Spec.DBStorage.StorageType
	persistentVolumeSource := v1.PersistentVolumeSource{}

	hostPathSuffix := ""

	if podType == util.PodTypeCMS {
		hostPathSuffix = util.CmsSuffixString
	} else {
		hostPathSuffix = util.DbSuffixString
	}

	if storageType == util.StorageTypeLocal {
		persistentVolumeSource.HostPath = &v1.HostPathVolumeSource{
			Path: util.HostPathPrefix + gs.Name + hostPathSuffix,
		}
	}
	// else share

	return persistentVolumeSource

}

func NewDbService(gs *gsv1.OpenGaussCluster, ord int) *v1.Service {
	policyPreferDualStack := v1.IPFamilyPolicyPreferDualStack
	return &v1.Service{
		ObjectMeta: metaV1.ObjectMeta{
			Name:            getDbSvcName(gs.Name, ord),
			Namespace:       gs.Namespace,
			Labels:          setServicesLabels(gs, util.PodTypeDB, ord),
			Annotations:     gs.Annotations,
			OwnerReferences: util.MakeOwnerRef(gs),
		},
		Spec: v1.ServiceSpec{
			Ports:          setPorts(gs, ord),
			Selector:       setServicesLabels(gs, util.PodTypeDB, ord),
			Type:           setServiceSpecType(gs.Spec.NetworkType),
			IPFamilyPolicy: &policyPreferDualStack,
		},
	}
}

func setPorts(gs *gsv1.OpenGaussCluster, ord int) []v1.ServicePort {

	return []v1.ServicePort{
		{
			Protocol: "TCP",
			Port:     5432,
			TargetPort: intstr.IntOrString{
				IntVal: 5432,
			},
			NodePort: setNodePort(gs, ord),
		},
	}
}

func setNodePort(gs *gsv1.OpenGaussCluster, ord int) int32 {
	networkType := gs.Spec.NetworkType
	if util.NetworkTypeNodePort == networkType {
		return *gs.Spec.Ports + int32(ord)
	}
	return 0
}

func setServiceSpecType(networkType string) v1.ServiceType {
	if util.NetworkTypeNodePort == networkType {
		return "NodePort"
	}
	return ""
}

func NewHeadlessService(gs *gsv1.OpenGaussCluster, pod *v1.Pod) *v1.Service {
	policyPreferDualStack := v1.IPFamilyPolicyPreferDualStack
	return &v1.Service{
		ObjectMeta: metaV1.ObjectMeta{
			Name:        pod.Name,
			Namespace:   gs.Namespace,
			Labels:      setHeadlessServiceLabels(gs, pod.Name),
			Annotations: gs.Annotations,
			OwnerReferences: []metaV1.OwnerReference{
				{
					APIVersion: pod.APIVersion,
					Kind:       pod.Kind,
					Name:       pod.Name,
					UID:        pod.UID,
				},
			},
		},
		Spec: v1.ServiceSpec{
			ClusterIP:      "None",
			Selector:       setHeadlessServiceLabels(gs, pod.Name),
			IPFamilyPolicy: &policyPreferDualStack,
		},
	}
}

func setVolumeNodeAffinity(nodeName string) *v1.VolumeNodeAffinity {
	return &v1.VolumeNodeAffinity{
		Required: &v1.NodeSelector{
			NodeSelectorTerms: []v1.NodeSelectorTerm{
				{
					MatchExpressions: []v1.NodeSelectorRequirement{
						{
							Key: util.HostnameKey,
							Values: []string{
								nodeName,
							},
							Operator: "In",
						},
					},
				},
			},
		},
	}
}

func findFixedPodIp(gs *gsv1.OpenGaussCluster, podType string, ordinal int) []string {

	if util.NetworkTypeNodePort != gs.Spec.NetworkType {
		return nil
	}

	var conditions []gsv1.OpenGaussPodCondition

	if util.PodTypeDB == podType {
		conditions = gs.Status.Conditions
	} else if util.PodTypeCMS == podType {
		conditions = gs.Status.CmsConditions
	} else {
		conditions = []gsv1.OpenGaussPodCondition{}
	}

	for i := 0; i < len(conditions); i++ {
		if i == ordinal {
			return conditions[i].Ips
		}
	}
	return nil
}

func recordPodIp(gs *gsv1.OpenGaussCluster, gsStatus *gsv1.OpenGaussClusterStatus) {

	if util.NetworkTypeNodePort != gs.Spec.NetworkType {
		return
	}

	oldConditions := gs.Status.Conditions
	newConditions := gsStatus.Conditions
	if oldConditions == nil {
		return
	}
	size := int(*gs.Spec.DataNodeNum)
	//todo: 考虑扩缩容节点数量
	oldLen := len(oldConditions)
	if oldLen > size {
		oldConditions = oldConditions[0:size]
	}
	if oldLen < size {
		statuses := make([]gsv1.OpenGaussPodCondition, size)
		copy(statuses, oldConditions)
		oldConditions = statuses
	}

	for i := 0; i < size; i++ {
		//todo: 考虑扩缩容节点数量
		oldNil := util.IsEmptyStruct(oldConditions[i])
		newNil := util.IsEmptyStruct(newConditions[i])
		// old nil new nil
		if oldNil && newNil {
			continue
		}
		// old nil  new !=nil
		if oldNil && !newNil {
			continue
		}
		// old != nil new == nil
		if !oldNil && newNil {
			ordinal, _ := util.GetPodOrdinal(oldConditions[i].Name)
			if ordinal != i {
				continue
			}
			newConditions[i] = oldConditions[i]
		}
		if !oldNil && !newNil {
			newOrd, _ := util.GetPodOrdinal(newConditions[i].Name)
			oldOrd, _ := util.GetPodOrdinal(oldConditions[i].Name)
			if newOrd != i || oldOrd != i {
				continue
			}
			if newConditions[i].Ips == nil || len(newConditions[i].Ips) == 0 {
				newConditions[i].Ips = oldConditions[i].Ips
			}
			if oldConditions[i].Ips == nil {
				continue
			}

			// 判断ip是否相同
			if !isEqualIps(newConditions[i].Ips, oldConditions[i].Ips) {
				gsStatus.State = "Error"
				gsStatus.Message = fmt.Sprintf("%s pod Ip conflict, old pod Ip is %s, "+
					"new pod is %s", newConditions[i].Name, oldConditions[i].Ips, newConditions[i].Ips)
			}
		}
	}
}

func isEqualIps(newIps, oldIps []string) bool {
	if len(newIps) != len(oldIps) {
		return false
	}
	for i := 0; i < len(newIps); i++ {
		if newIps[i] != oldIps[i] && oldIps[i] != "" {
			return false
		}
	}
	return true
}

func MakePodCondition(pod *v1.Pod) *gsv1.OpenGaussPodCondition {
	podIPArr := pod.Status.PodIPs
	ips := make([]string, len(podIPArr))
	for i, ip := range podIPArr {
		ips[i] = ip.IP
	}
	return &gsv1.OpenGaussPodCondition{
		Name:     pod.Name,
		Ips:      ips,
		NodeName: pod.Spec.NodeName,
		NodeIp:   pod.Status.HostIP,
		Phase:    pod.Status.Phase,
		Message:  pod.Status.Message,
	}
}

func (r *ClusterManagement) updateOpenGaussClusterStatus(gs *gsv1.OpenGaussCluster, gsStatus *gsv1.OpenGaussClusterStatus, clusterMsgMap map[string]string) error {

	gsStatus.Replicas = gs.Spec.DataNodeNum

	recordPodIp(gs, gsStatus)

	echoNodeState(gsStatus, clusterMsgMap)

	// 判断数据是否有变动
	if reflect.DeepEqual(gs.Status, *gsStatus) {
		return nil
	}
	gs.Status = *gsStatus
	if err := r.K8sClient.UpdateOpenGaussClusterStatus(gs); err != nil {
		return err
	}

	return nil
}

func echoNodeState(gsStatus *gsv1.OpenGaussClusterStatus, clusterMsgMap map[string]string) {
	if clusterMsgMap == nil {
		return
	}

	for i, condition := range gsStatus.CmsConditions {
		gsStatus.CmsConditions[i].State = clusterMsgMap[condition.Name]
	}
	for i, condition := range gsStatus.Conditions {
		gsStatus.Conditions[i].State = clusterMsgMap[condition.Name]
	}
}
